/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package milvus;

import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.milvus.bulkwriter.*;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.bulkwriter.common.clientenum.CloudStorage;
import io.milvus.bulkwriter.common.utils.GeneratorUtils;
import io.milvus.bulkwriter.common.utils.ImportUtils;
import io.milvus.bulkwriter.common.utils.ParquetReaderUtils;
import io.milvus.bulkwriter.connect.AzureConnectParam;
import io.milvus.bulkwriter.connect.S3ConnectParam;
import io.milvus.bulkwriter.connect.StorageConnectParam;
import io.milvus.bulkwriter.response.BulkImportResponse;
import io.milvus.bulkwriter.response.GetImportProgressResponse;
import io.milvus.bulkwriter.response.ListImportJobsResponse;
import io.milvus.client.MilvusClient;
import io.milvus.client.MilvusServiceClient;
import io.milvus.common.utils.ExceptionUtils;
import io.milvus.grpc.*;
import io.milvus.param.*;
import io.milvus.param.bulkinsert.BulkInsertParam;
import io.milvus.param.bulkinsert.GetBulkInsertStateParam;
import io.milvus.param.collection.*;
import io.milvus.param.dml.QueryParam;
import io.milvus.param.index.CreateIndexParam;
import io.milvus.response.GetCollStatResponseWrapper;
import io.milvus.response.QueryResultsWrapper;
import lombok.Data;
import org.apache.avro.generic.GenericData;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.util.Asserts;
import org.apache.logging.log4j.util.Strings;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;


public class BulkWriterExample {
    // milvus
    public static final String HOST = "127.0.0.1";
    public static final Integer PORT = 19530;
    public static final String USER_NAME = "user.name";
    public static final String PASSWORD = "password";


    /**
     * If you need to transfer the files generated by bulkWriter to the corresponding remote storage (AWS S3, GCP GCS, Azure Blob, Aliyun OSS, Tencent Cloud TOS),
     * you need to configure it accordingly; Otherwise, you can ignore it.
     */
    public static class StorageConsts {
        public static final CloudStorage cloudStorage = CloudStorage.MINIO;

        /**
         * If using remote storage such as AWS S3, GCP GCS, Aliyun OSS, Tencent Cloud TOS, Minio
         * please configure the following parameters.
         */
        public static final String STORAGE_ENDPOINT = cloudStorage.getEndpoint("http://127.0.0.1:9000");
        public static final String STORAGE_BUCKET = "storage.bucket";
        public static final String STORAGE_ACCESS_KEY = "storage.access.key";
        public static final String STORAGE_SECRET_KEY = "storage.secret.key";
        /**
         * if using local storage such as Minio
         * Please set this parameter to empty.
         */
        public static final String STORAGE_REGION = "storage.region";

        /**
         * If using remote storage such as Azure Blob
         * please configure the following parameters.
         */
        public static final String AZURE_CONTAINER_NAME = "azure.container.name";
        public static final String AZURE_ACCOUNT_NAME = "azure.account.name";
        public static final String AZURE_ACCOUNT_KEY = "azure.account.key";
    }


    /**
     * If you have used remoteBulkWriter to generate remote data and want to import data using the Import interface on Zilliz Cloud after generation,
     * you don't need to configure the following object-related parameters (OBJECT_URL, OBJECT_ACCESS_KEY, OBJECT_SECRET_KEY). You can call the callCloudImport method, as the internal logic has been encapsulated for you.
     * <p>
     * If you already have data stored in remote storage (not generated through remoteBulkWriter), and you want to invoke the Import interface on Zilliz Cloud to import data,
     * you need to configure the following parameters and then follow the exampleCloudBulkInsert method.
     * <p>
     * If you do not need to import data through the Import interface on Zilliz Cloud, you can ignore this.
     */
    public static class CloudImportConsts {

        /**
         * If you are an overseas user, you can use the following endpoint format: https://controller.api.{cloud-region}.zillizcloud.com/v1/vector/collections/import.
         * If not, you can use the following endpoint format: https://controller.api.${CLOUD_REGION_ID}.cloud.zilliz.com.cn/v1/vector/collections/import.
         */
        public static final String CLOUD_ENDPOINT = "https://controller.api.${CLOUD-REGION}.{ENDPOINT-SUFFIX}";
        public static final String API_KEY = "_api_key_of_the_user";
        public static final String CLUSTER_ID = "_your_cloud_instance_id_";
        public static final String COLLECTION_NAME = "_collection_name_on_the_cloud_";

        /**
         * Please provide the complete URL for the file or folder you want to import, similar to https://bucket-name.s3.region-code.amazonaws.com/object-name.
         * For more details, you can refer to https://docs.zilliz.com/docs/import-data-on-web-ui.
         */
        public static final String OBJECT_URL = "_your_storage_object_url_";
        public static final String OBJECT_ACCESS_KEY = "_your_storage_access_key_";
        public static final String OBJECT_SECRET_KEY = "_your_storage_secret_key_";
    }

    private static final String SIMPLE_COLLECTION_NAME = "for_bulkwriter";
    private static final String ALL_TYPES_COLLECTION_NAME = "all_types_for_bulkwriter";
    private static final Integer DIM = 512;
    private MilvusClient milvusClient;

    public static void main(String[] args) throws Exception {
        BulkWriterExample exampleBulkWriter = new BulkWriterExample();
        exampleBulkWriter.createConnection();

        List<BulkFileType> fileTypes = Lists.newArrayList(
                BulkFileType.PARQUET
        );

        exampleSimpleCollection(exampleBulkWriter, fileTypes);
        exampleAllTypeCollectionRemote(exampleBulkWriter, fileTypes);

        // to call cloud import api, you need to apply a cloud service from Zilliz Cloud(https://zilliz.com/cloud)
        // exampleCloudImport();
    }

    private void createConnection() {
        System.out.println("\nCreate connection...");
        ConnectParam connectParam = ConnectParam.newBuilder()
                .withHost(HOST)
                .withPort(PORT)
                .withAuthorization(USER_NAME, PASSWORD)
                .build();
        milvusClient = new MilvusServiceClient(connectParam);
        System.out.println("\nConnected");
    }

    private static void exampleSimpleCollection(BulkWriterExample exampleBulkWriter, List<BulkFileType> fileTypes) throws Exception {
        CollectionSchemaParam collectionSchema = exampleBulkWriter.buildSimpleCollection();
        exampleBulkWriter.createCollection(SIMPLE_COLLECTION_NAME, collectionSchema, false);

        for (BulkFileType fileType : fileTypes) {
            localWriter(collectionSchema, fileType);
        }

        for (BulkFileType fileType : fileTypes) {
            remoteWriter(collectionSchema, fileType);
        }

        // parallel append
        parallelAppend(collectionSchema);
    }

    private static void exampleAllTypeCollectionRemote(BulkWriterExample exampleBulkWriter, List<BulkFileType> fileTypes) throws Exception {
        // float vectors + all scalar types, use bulkInsert interface
        for (BulkFileType fileType : fileTypes) {
            CollectionSchemaParam collectionSchema = buildAllTypeSchema(false, true);
            List<List<String>> batchFiles = exampleBulkWriter.allTypesRemoteWriter(false, collectionSchema, fileType);
            exampleBulkWriter.callBulkInsert(collectionSchema, batchFiles);
            exampleBulkWriter.retrieveImportData(false);
        }

        // binary vectors + all scalar types, use bulkInsert interface
        for (BulkFileType fileType : fileTypes) {
            CollectionSchemaParam collectionSchema = buildAllTypeSchema(true, true);
            List<List<String>> batchFiles = exampleBulkWriter.allTypesRemoteWriter(true, collectionSchema, fileType);
            exampleBulkWriter.callBulkInsert(collectionSchema, batchFiles);
            exampleBulkWriter.retrieveImportData(true);
        }

        // float vectors + all scalar types, use cloud import api.
        // You need to apply a cloud service from Zilliz Cloud(https://zilliz.com/cloud)
        for (BulkFileType fileType : fileTypes) {
            CollectionSchemaParam collectionSchema = buildAllTypeSchema(false, true);
            List<List<String>> batchFiles = exampleBulkWriter.allTypesRemoteWriter(false, collectionSchema, fileType);
            exampleBulkWriter.createCollection(ALL_TYPES_COLLECTION_NAME, collectionSchema, false);
            exampleBulkWriter.callCloudImport(batchFiles, ALL_TYPES_COLLECTION_NAME);
            exampleBulkWriter.retrieveImportData(false);
        }
    }

    private static void localWriter(CollectionSchemaParam collectionSchema, BulkFileType fileType) throws Exception {
        System.out.printf("\n===================== local writer (%s) ====================%n", fileType.name());
        LocalBulkWriterParam bulkWriterParam = LocalBulkWriterParam.newBuilder()
                .withCollectionSchema(collectionSchema)
                .withLocalPath("/tmp/bulk_writer")
                .withFileType(fileType)
                .withChunkSize(128 * 1024 * 1024)
                .build();

        try (LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam)) {
            // read data from csv
            readCsvSampleData("data/train_embeddings.csv", localBulkWriter);

            // append rows
            for (int i = 0; i < 100000; i++) {
                JSONObject row = new JSONObject();
                row.put("path", "path_" + i);
                row.put("vector", GeneratorUtils.genFloatVector(DIM));
                row.put("label", "label_" + i);

                localBulkWriter.appendRow(row);
            }

            System.out.printf("%s rows appends%n", localBulkWriter.getTotalRowCount());
            System.out.printf("%s rows in buffer not flushed%n", localBulkWriter.getBufferRowCount());

            localBulkWriter.commit(false);
            List<List<String>> batchFiles = localBulkWriter.getBatchFiles();
            System.out.printf("Local writer done! output local files: %s%n", batchFiles);
        } catch (Exception e) {
            System.out.println("localWriter catch exception: " + e);
            throw e;
        }
    }

    private static void remoteWriter(CollectionSchemaParam collectionSchema, BulkFileType fileType) throws Exception {
        System.out.printf("\n===================== remote writer (%s) ====================%n", fileType.name());

        try (RemoteBulkWriter remoteBulkWriter = buildRemoteBulkWriter(collectionSchema, fileType)) {
            // read data from csv
            readCsvSampleData("data/train_embeddings.csv", remoteBulkWriter);

            // append rows
            for (int i = 0; i < 100000; i++) {
                JSONObject row = new JSONObject();
                row.put("path", "path_" + i);
                row.put("vector", GeneratorUtils.genFloatVector(DIM));
                row.put("label", "label_" + i);

                remoteBulkWriter.appendRow(row);
            }

            System.out.printf("%s rows appends%n", remoteBulkWriter.getTotalRowCount());
            System.out.printf("%s rows in buffer not flushed%n", remoteBulkWriter.getBufferRowCount());

            remoteBulkWriter.commit(false);
            List<List<String>> batchFiles = remoteBulkWriter.getBatchFiles();

            System.out.printf("Remote writer done! output remote files: %s%n", batchFiles);
        } catch (Exception e) {
            System.out.println("remoteWriter catch exception: " + e);
            throw e;
        }
    }

    private static void parallelAppend(CollectionSchemaParam collectionSchema) throws Exception {
        System.out.print("\n===================== parallel append ====================");
        LocalBulkWriterParam bulkWriterParam = LocalBulkWriterParam.newBuilder()
                .withCollectionSchema(collectionSchema)
                .withLocalPath("/tmp/bulk_writer")
                .withFileType(BulkFileType.PARQUET)
                .withChunkSize(128 * 1024 * 1024)  // 128MB
                .build();

        try (LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam)) {
            List<Thread> threads = new ArrayList<>();
            int threadCount = 10;
            int rowsPerThread = 1000;
            for (int i = 0; i < threadCount; ++i) {
                int current = i;
                Thread thread = new Thread(() -> appendRow(localBulkWriter, current * rowsPerThread, (current + 1) * rowsPerThread));
                threads.add(thread);
                thread.start();
                System.out.printf("Thread %s started%n", thread.getName());
            }

            for (Thread thread : threads) {
                thread.join();
                System.out.printf("Thread %s finished%n", thread.getName());
            }

            System.out.println(localBulkWriter.getTotalRowCount() + " rows appends");
            System.out.println(localBulkWriter.getBufferRowCount() + " rows in buffer not flushed");
            localBulkWriter.commit(false);
            System.out.printf("Append finished, %s rows%n", threadCount * rowsPerThread);

            int rowCount = 0;
            List<List<String>> batchFiles = localBulkWriter.getBatchFiles();
            for (List<String> batch : batchFiles) {
                for (String filePath : batch) {
                    rowCount += readParquet(filePath);
                }
            }

            Asserts.check(rowCount == threadCount * rowsPerThread, String.format("rowCount %s not equals expected %s", rowCount, threadCount * rowsPerThread));
            System.out.println("Data is correct");
        } catch (Exception e) {
            System.out.println("parallelAppend catch exception: " + e);
            throw e;
        }
    }

    private static long readParquet(String localFilePath) throws Exception {
        final long[] rowCount = {0};
        new ParquetReaderUtils() {
            @Override
            public void readRecord(GenericData.Record record) {
                rowCount[0]++;
                String pathValue = record.get("path").toString();
                String labelValue = record.get("label").toString();
                Asserts.check(pathValue.replace("path_", "").equals(labelValue.replace("label_", "")), String.format("the suffix of %s not equals the suffix of %s", pathValue, labelValue));
            }
        }.readParquet(localFilePath);
        System.out.printf("The file %s contains %s rows. Verify the content...%n", localFilePath, rowCount[0]);
        return rowCount[0];
    }

    private static void appendRow(LocalBulkWriter writer, int begin, int end) {
        try {
            for (int i = begin; i < end; ++i) {
                JSONObject row = new JSONObject();
                row.put("path", "path_" + i);
                row.put("vector", GeneratorUtils.genFloatVector(DIM));
                row.put("label", "label_" + i);

                writer.appendRow(row);
                if (i % 100 == 0) {
                    System.out.printf("%s inserted %s items%n", Thread.currentThread().getName(), i - begin);
                }
            }
        } catch (Exception e) {
            System.out.println("failed to append row!");
        }
    }

    private List<List<String>> allTypesRemoteWriter(boolean binVec, CollectionSchemaParam collectionSchema, BulkFileType fileType) throws Exception {
        System.out.printf("\n===================== all field types (%s) binary_vector=%s ====================%n", fileType.name(), binVec);

        try (RemoteBulkWriter remoteBulkWriter = buildRemoteBulkWriter(collectionSchema, fileType)) {
            System.out.println("Append rows");
            int batchCount = 10000;

            for (int i = 0; i < batchCount; ++i) {
                JSONObject rowObject = new JSONObject();

                // scalar field
                rowObject.put("id", i);
                rowObject.put("bool", i % 5 == 0);
                rowObject.put("int8", i % 128);
                rowObject.put("int16", i % 1000);
                rowObject.put("int32", i % 100000);
                rowObject.put("float", i / 3);
                rowObject.put("double", i / 7);
                rowObject.put("varchar", "varchar_" + i);
                rowObject.put("json", String.format("{\"dummy\": %s, \"ok\": \"name_%s\"}", i, i));

                // vector field
                rowObject.put("vector", binVec ? GeneratorUtils.generatorBinaryVector(128) : GeneratorUtils.generatorFloatValue(128));

                // array field
                rowObject.put("arrayInt64", GeneratorUtils.generatorLongValue(10));
                rowObject.put("arrayVarchar", GeneratorUtils.generatorVarcharValue(10, 10));
                rowObject.put("arrayInt8", GeneratorUtils.generatorInt8Value(10));
                rowObject.put("arrayInt16", GeneratorUtils.generatorInt16Value(10));
                rowObject.put("arrayInt32", GeneratorUtils.generatorInt32Value(10));
                rowObject.put("arrayFloat", GeneratorUtils.generatorFloatValue(10));
                rowObject.put("arrayDouble", GeneratorUtils.generatorDoubleValue(10));
                rowObject.put("arrayBool", GeneratorUtils.generatorBoolValue(10));

                remoteBulkWriter.appendRow(rowObject);
            }
            System.out.printf("%s rows appends%n", remoteBulkWriter.getTotalRowCount());
            System.out.printf("%s rows in buffer not flushed%n", remoteBulkWriter.getBufferRowCount());
            System.out.println("Generate data files...");
            remoteBulkWriter.commit(false);

            System.out.printf("Data files have been uploaded: %s%n", remoteBulkWriter.getBatchFiles());
            return remoteBulkWriter.getBatchFiles();
        } catch (Exception e) {
            System.out.println("allTypesRemoteWriter catch exception: " + e);
            throw e;
        }
    }

    private static RemoteBulkWriter buildRemoteBulkWriter(CollectionSchemaParam collectionSchema, BulkFileType fileType) throws IOException {
        StorageConnectParam connectParam = buildStorageConnectParam();
        RemoteBulkWriterParam bulkWriterParam = RemoteBulkWriterParam.newBuilder()
                .withCollectionSchema(collectionSchema)
                .withRemotePath("bulk_data")
                .withFileType(fileType)
                .withChunkSize(512 * 1024 * 1024)
                .withConnectParam(connectParam)
                .build();
        return new RemoteBulkWriter(bulkWriterParam);
    }

    private static StorageConnectParam buildStorageConnectParam() {
        StorageConnectParam connectParam;
        if (StorageConsts.cloudStorage == CloudStorage.AZURE) {
            String connectionStr = "DefaultEndpointsProtocol=https;AccountName=" + StorageConsts.AZURE_ACCOUNT_NAME +
                    ";AccountKey=" + StorageConsts.AZURE_ACCOUNT_KEY + ";EndpointSuffix=core.windows.net";
            connectParam = AzureConnectParam.newBuilder()
                    .withConnStr(connectionStr)
                    .withContainerName(StorageConsts.AZURE_CONTAINER_NAME)
                    .build();
        } else {
            connectParam = S3ConnectParam.newBuilder()
                    .withEndpoint(StorageConsts.STORAGE_ENDPOINT)
                    .withBucketName(StorageConsts.STORAGE_BUCKET)
                    .withAccessKey(StorageConsts.STORAGE_ACCESS_KEY)
                    .withSecretKey(StorageConsts.STORAGE_SECRET_KEY)
                    .withRegion(StorageConsts.STORAGE_REGION)
                    .build();
        }
        return connectParam;
    }

    private static void readCsvSampleData(String filePath, BulkWriter writer) throws IOException, InterruptedException {
        ClassLoader classLoader = BulkWriterExample.class.getClassLoader();
        URL resourceUrl = classLoader.getResource(filePath);
        assert resourceUrl != null;
        filePath = new File(resourceUrl.getFile()).getAbsolutePath();

        CsvMapper csvMapper = new CsvMapper();

        File csvFile = new File(filePath);
        CsvSchema csvSchema = CsvSchema.builder().setUseHeader(true).build();
        Iterator<CsvDataObject> iterator = csvMapper.readerFor(CsvDataObject.class).with(csvSchema).readValues(csvFile);
        while (iterator.hasNext()) {
            CsvDataObject dataObject = iterator.next();
            JSONObject row = new JSONObject();

            row.put("vector", dataObject.toFloatArray());
            row.put("label", dataObject.getLabel());
            row.put("path", dataObject.getPath());

            writer.appendRow(row);
        }
    }

    @Data
    private static class CsvDataObject {
        @JsonProperty
        private String vector;
        @JsonProperty
        private String path;
        @JsonProperty
        private String label;

//        public String getVector() {
//            return vector;
//        }
//        public String getPath() {
//            return path;
//        }
//        public String getLabel() {
//            return label;
//        }
        public List<Float> toFloatArray() {
            return new Gson().fromJson(vector, new TypeToken<List<Float>>() {
            }.getType());
        }
    }

    private void callBulkInsert(CollectionSchemaParam collectionSchema, List<List<String>> batchFiles) throws InterruptedException {
        System.out.println("\n===================== call bulkInsert ====================");
        createCollection(ALL_TYPES_COLLECTION_NAME, collectionSchema, false);

        List<Long> taskIds = new ArrayList<>();
        for (List<String> batch : batchFiles) {
            Long taskId = bulkInsert(batch);
            taskIds.add(taskId);
            System.out.println("Create a bulkInert task, task id: " + taskId);
        }

        while (!taskIds.isEmpty()) {
            Iterator<Long> iterator = taskIds.iterator();
            List<Long> tempTaskIds = new ArrayList<>();
            while (iterator.hasNext()) {
                Long taskId = iterator.next();
                System.out.println("Wait 5 second to check bulkInsert tasks state...");
                TimeUnit.SECONDS.sleep(5);

                GetImportStateResponse bulkInsertState = getBulkInsertState(taskId);
                if (bulkInsertState.getState() == ImportState.ImportFailed
                        || bulkInsertState.getState() == ImportState.ImportFailedAndCleaned) {
                    List<KeyValuePair> infosList = bulkInsertState.getInfosList();
                    Optional<String> failedReasonOptional = infosList.stream().filter(e -> e.getKey().equals("failed_reason"))
                            .map(KeyValuePair::getValue).findFirst();
                    String failedReson = failedReasonOptional.orElse(Strings.EMPTY);

                    System.out.printf("The task %s failed, reason: %s%n", taskId, failedReson);
                } else if (bulkInsertState.getState() == ImportState.ImportCompleted) {
                    System.out.printf("The task %s completed%n", taskId);
                } else {
                    System.out.printf("The task %s is running, state:%s%n", taskId, bulkInsertState.getState());
                    tempTaskIds.add(taskId);
                }
            }
            taskIds = tempTaskIds;
        }

        System.out.println("Collection row number: " + getCollectionStatistics());
    }

    private void callCloudImport(List<List<String>> batchFiles, String collectionName) throws InterruptedException, MalformedURLException {
        System.out.println("\n===================== call cloudImport ====================");

        String objectUrl = StorageConsts.cloudStorage == CloudStorage.AZURE
                ? StorageConsts.cloudStorage.getAzureObjectUrl(StorageConsts.AZURE_ACCOUNT_NAME, StorageConsts.AZURE_CONTAINER_NAME, ImportUtils.getCommonPrefix(batchFiles))
                : StorageConsts.cloudStorage.getS3ObjectUrl(StorageConsts.STORAGE_BUCKET, ImportUtils.getCommonPrefix(batchFiles), StorageConsts.STORAGE_REGION);
        String accessKey = StorageConsts.cloudStorage == CloudStorage.AZURE ? StorageConsts.AZURE_ACCOUNT_NAME : StorageConsts.STORAGE_ACCESS_KEY;
        String secretKey = StorageConsts.cloudStorage == CloudStorage.AZURE ? StorageConsts.AZURE_ACCOUNT_KEY : StorageConsts.STORAGE_SECRET_KEY;

        BulkImportResponse bulkImportResponse = CloudImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, objectUrl, accessKey, secretKey, CloudImportConsts.CLUSTER_ID, collectionName);
        String jobId = bulkImportResponse.getJobId();
        System.out.println("Create a cloudImport job, job id: " + jobId);

        while (true) {
            System.out.println("Wait 5 second to check bulkInsert job state...");
            TimeUnit.SECONDS.sleep(5);

            GetImportProgressResponse getImportProgressResponse = CloudImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, jobId, CloudImportConsts.CLUSTER_ID);
            if (getImportProgressResponse.getReadyPercentage().intValue() == 1) {
                System.out.printf("The job %s completed%n", jobId);
                break;
            } else if (StringUtils.isNotEmpty(getImportProgressResponse.getErrorMessage())) {
                System.out.printf("The job %s failed, reason: %s%n", jobId, getImportProgressResponse.getErrorMessage());
                break;
            } else {
                System.out.printf("The job %s is running, progress:%s%n", jobId, getImportProgressResponse.getReadyPercentage());
            }
        }

        System.out.println("Collection row number: " + getCollectionStatistics());
    }

    /**
     * @param collectionSchema collection info
     * @param dropIfExist     if collection already exist, will drop firstly and then create again
     */
    private void createCollection(String collectionName, CollectionSchemaParam collectionSchema, boolean dropIfExist) {
        System.out.println("\n===================== create collection ====================");
        checkMilvusClientIfExist();
        CreateCollectionParam collectionParam = CreateCollectionParam.newBuilder()
                .withCollectionName(collectionName)
                .withSchema(collectionSchema)
                .build();
        R<Boolean> hasCollection = milvusClient.hasCollection(HasCollectionParam.newBuilder().withCollectionName(collectionName).build());
        if (hasCollection.getData()) {
            if (dropIfExist) {
                milvusClient.dropCollection(DropCollectionParam.newBuilder().withCollectionName(collectionName).build());
                milvusClient.createCollection(collectionParam);
            }
        } else {
            milvusClient.createCollection(collectionParam);
        }
        System.out.printf("Collection %s created%n", collectionName);
    }

    private void retrieveImportData(boolean binVec) {
        createIndex(binVec);

        List<Integer> ids = Lists.newArrayList(100, 5000);
        System.out.printf("Load collection and query items %s%n", ids);
        loadCollection();

        String expr = String.format("id in %s", ids);
        System.out.println(expr);

        List<QueryResultsWrapper.RowRecord> rowRecords = query(expr, Lists.newArrayList("*", "vector"));
        System.out.println("Query results:");
        for (QueryResultsWrapper.RowRecord record : rowRecords) {
            System.out.println(record);
        }
    }

    private void createIndex(boolean binVec) {
        System.out.println("Create index...");
        checkMilvusClientIfExist();
        CreateIndexParam.Builder builder = CreateIndexParam.newBuilder()
                .withCollectionName(ALL_TYPES_COLLECTION_NAME)
                .withFieldName("vector")
                .withIndexName("index_name")
                .withSyncMode(Boolean.TRUE);

        if (binVec) {
            builder.withIndexType(IndexType.BIN_FLAT);
            builder.withMetricType(MetricType.HAMMING);
        } else {
            builder.withIndexType(IndexType.FLAT);
            builder.withMetricType(MetricType.L2);
        }

        R<RpcStatus> response = milvusClient.createIndex(builder.build());
        ExceptionUtils.handleResponseStatus(response);
    }

    private R<RpcStatus> loadCollection() {
        System.out.println("Loading Collection...");
        checkMilvusClientIfExist();
        R<RpcStatus> response = milvusClient.loadCollection(LoadCollectionParam.newBuilder()
                .withCollectionName(ALL_TYPES_COLLECTION_NAME)
                .build());
        ExceptionUtils.handleResponseStatus(response);
        return response;
    }

    private List<QueryResultsWrapper.RowRecord> query(String expr, List<String> outputFields) {
        System.out.println("========== query() ==========");
        checkMilvusClientIfExist();
        QueryParam test = QueryParam.newBuilder()
                .withCollectionName(ALL_TYPES_COLLECTION_NAME)
                .withExpr(expr)
                .withOutFields(outputFields)
                .build();
        R<QueryResults> response = milvusClient.query(test);
        ExceptionUtils.handleResponseStatus(response);
        QueryResultsWrapper wrapper = new QueryResultsWrapper(response.getData());
        return wrapper.getRowRecords();
    }

    private Long bulkInsert(List<String> batchFiles) {
        System.out.println("========== bulkInsert() ==========");
        checkMilvusClientIfExist();
        R<ImportResponse> response = milvusClient.bulkInsert(BulkInsertParam.newBuilder()
                .withCollectionName(ALL_TYPES_COLLECTION_NAME)
                .withFiles(batchFiles)
                .build());
        ExceptionUtils.handleResponseStatus(response);
        return response.getData().getTasksList().get(0);
    }

    private GetImportStateResponse getBulkInsertState(Long taskId) {
        System.out.println("========== getBulkInsertState() ==========");
        checkMilvusClientIfExist();
        R<GetImportStateResponse> bulkInsertState = milvusClient.getBulkInsertState(GetBulkInsertStateParam.newBuilder()
                .withTask(taskId)
                .build());
        return bulkInsertState.getData();
    }

    private Long getCollectionStatistics() {
        System.out.println("========== getCollectionStatistics() ==========");
        // call flush() to flush the insert buffer to storage,
        // so that the getCollectionStatistics() can get correct number
        checkMilvusClientIfExist();
        milvusClient.flush(FlushParam.newBuilder().addCollectionName(ALL_TYPES_COLLECTION_NAME).build());
        R<GetCollectionStatisticsResponse> response = milvusClient.getCollectionStatistics(
                GetCollectionStatisticsParam.newBuilder()
                        .withCollectionName(ALL_TYPES_COLLECTION_NAME)
                        .build());
        ExceptionUtils.handleResponseStatus(response);
        GetCollStatResponseWrapper wrapper = new GetCollStatResponseWrapper(response.getData());
        return wrapper.getRowCount();
    }

    private static void exampleCloudImport() throws MalformedURLException {
        System.out.println("\n===================== import files to cloud vectordb ====================");
        BulkImportResponse bulkImportResponse = CloudImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY,
                CloudImportConsts.OBJECT_URL, CloudImportConsts.OBJECT_ACCESS_KEY, CloudImportConsts.OBJECT_SECRET_KEY,
                CloudImportConsts.CLUSTER_ID, CloudImportConsts.COLLECTION_NAME);
        System.out.println(new Gson().toJson(bulkImportResponse));

        System.out.println("\n===================== get import job progress ====================");
        String jobId = bulkImportResponse.getJobId();
        GetImportProgressResponse getImportProgressResponse = CloudImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, jobId, CloudImportConsts.CLUSTER_ID);
        System.out.println(new Gson().toJson(getImportProgressResponse));

        System.out.println("\n===================== list import jobs ====================");
        ListImportJobsResponse listImportJobsResponse = CloudImport.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, CloudImportConsts.CLUSTER_ID, 10, 1);
        System.out.println(new Gson().toJson(listImportJobsResponse));
    }

    private CollectionSchemaParam buildSimpleCollection() {
        FieldType fieldType1 = FieldType.newBuilder()
                .withName("id")
                .withDataType(DataType.Int64)
                .withPrimaryKey(true)
                .withAutoID(true)
                .build();

        // vector field
        FieldType fieldType2 = FieldType.newBuilder()
                .withName("vector")
                .withDataType(DataType.FloatVector)
                .withDimension(DIM)
                .build();

        // scalar field
        FieldType fieldType3 = FieldType.newBuilder()
                .withName("path")
                .withDataType(DataType.VarChar)
                .withMaxLength(512)
                .build();

        FieldType fieldType4 = FieldType.newBuilder()
                .withName("label")
                .withDataType(DataType.VarChar)
                .withMaxLength(512)
                .build();

        CollectionSchemaParam collectionSchema = CollectionSchemaParam.newBuilder()
                .addFieldType(fieldType1)
                .addFieldType(fieldType2)
                .addFieldType(fieldType3)
                .addFieldType(fieldType4)
                .build();
        System.out.println(collectionSchema);
        return collectionSchema;
    }

    private static CollectionSchemaParam buildAllTypeSchema(boolean binVec, boolean hasArray) {
        // scalar field
        FieldType fieldType1 = FieldType.newBuilder()
                .withName("id")
                .withDataType(DataType.Int64)
                .withPrimaryKey(true)
                .withAutoID(false)
                .build();

        FieldType fieldType2 = FieldType.newBuilder()
                .withName("bool")
                .withDataType(DataType.Bool)
                .build();

        FieldType fieldType3 = FieldType.newBuilder()
                .withName("int8")
                .withDataType(DataType.Int8)
                .build();

        FieldType fieldType4 = FieldType.newBuilder()
                .withName("int16")
                .withDataType(DataType.Int16)
                .build();

        FieldType fieldType5 = FieldType.newBuilder()
                .withName("int32")
                .withDataType(DataType.Int32)
                .build();

        FieldType fieldType6 = FieldType.newBuilder()
                .withName("float")
                .withDataType(DataType.Float)
                .build();

        FieldType fieldType7 = FieldType.newBuilder()
                .withName("double")
                .withDataType(DataType.Double)
                .build();

        FieldType fieldType8 = FieldType.newBuilder()
                .withName("varchar")
                .withDataType(DataType.VarChar)
                .withMaxLength(512)
                .build();

        FieldType fieldType9 = FieldType.newBuilder()
                .withName("json")
                .withDataType(DataType.JSON)
                .build();

        // vector field
        FieldType fieldType10;
        if (binVec) {
            fieldType10 = FieldType.newBuilder()
                    .withName("vector")
                    .withDataType(DataType.BinaryVector)
                    .withDimension(128)
                    .build();
        } else {
            fieldType10 = FieldType.newBuilder()
                    .withName("vector")
                    .withDataType(DataType.FloatVector)
                    .withDimension(128)
                    .build();
        }

        CollectionSchemaParam.Builder schemaBuilder = CollectionSchemaParam.newBuilder()
                .withEnableDynamicField(false)
                .addFieldType(fieldType1)
                .addFieldType(fieldType2)
                .addFieldType(fieldType3)
                .addFieldType(fieldType4)
                .addFieldType(fieldType5)
                .addFieldType(fieldType6)
                .addFieldType(fieldType7)
                .addFieldType(fieldType8)
                .addFieldType(fieldType9)
                .addFieldType(fieldType10);

        // array field
        if (hasArray) {
            FieldType fieldType11 = FieldType.newBuilder()
                    .withName("arrayInt64")
                    .withDataType(DataType.Array)
                    .withElementType(DataType.Int64)
                    .withMaxCapacity(10)
                    .build();

            FieldType fieldType12 = FieldType.newBuilder()
                    .withName("arrayVarchar")
                    .withDataType(DataType.Array)
                    .withElementType(DataType.VarChar)
                    .withMaxLength(10)
                    .withMaxCapacity(10)
                    .build();

            FieldType fieldType13 = FieldType.newBuilder()
                    .withName("arrayInt8")
                    .withDataType(DataType.Array)
                    .withElementType(DataType.Int8)
                    .withMaxCapacity(10)
                    .build();

            FieldType fieldType14 = FieldType.newBuilder()
                    .withName("arrayInt16")
                    .withDataType(DataType.Array)
                    .withElementType(DataType.Int16)
                    .withMaxCapacity(10)
                    .build();

            FieldType fieldType15 = FieldType.newBuilder()
                    .withName("arrayInt32")
                    .withDataType(DataType.Array)
                    .withElementType(DataType.Int32)
                    .withMaxCapacity(10)
                    .build();

            FieldType fieldType16 = FieldType.newBuilder()
                    .withName("arrayFloat")
                    .withDataType(DataType.Array)
                    .withElementType(DataType.Float)
                    .withMaxCapacity(10)
                    .build();

            FieldType fieldType17 = FieldType.newBuilder()
                    .withName("arrayDouble")
                    .withDataType(DataType.Array)
                    .withElementType(DataType.Double)
                    .withMaxCapacity(10)
                    .build();

            FieldType fieldType18 = FieldType.newBuilder()
                    .withName("arrayBool")
                    .withDataType(DataType.Array)
                    .withElementType(DataType.Bool)
                    .withMaxCapacity(10)
                    .build();

            schemaBuilder.addFieldType(fieldType11)
                    .addFieldType(fieldType12)
                    .addFieldType(fieldType13)
                    .addFieldType(fieldType14)
                    .addFieldType(fieldType15)
                    .addFieldType(fieldType16)
                    .addFieldType(fieldType17)
                    .addFieldType(fieldType18);
        }
        return schemaBuilder.build();
    }

    private void checkMilvusClientIfExist() {
        if (milvusClient == null) {
            String msg = "milvusClient is null. Please initialize it by calling createConnection() first before use.";
            throw new RuntimeException(msg);
        }
    }
}
